package vip.shuai7boy.trafficTemp.util;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.alibaba.fastjson.JSONObject;
import vip.shuai7boy.trafficTemp.conf.ConfigurationManager;
import vip.shuai7boy.trafficTemp.constant.Constants;
import vip.spark.spark.test.MockData;

public class SparkUtils {
    /**
     * 生成模拟数据
     * 如果spark.local配置设置为true，则生成模拟数据；否则不生成
     */
    public static void mockData(JavaSparkContext sc, SparkSession spark) {
        boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
        if (local) {
            MockData.mock(sc, spark);
        }
    }

    /**
     * 获取指定日期范围内的卡口信息
     */
    public static JavaRDD<Row> getCameraRDDByDateRange(SparkSession spark, JSONObject taskParamsJsonObject) {
        String startDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_END_DATE);
        String sql =
                "SELECT *  FROM monitor_flow_action "
                        + "WHERE date>='" + startDate + "' "
                        + "AND date<='" + endDate + "'";

        Dataset<Row> monitorDF = spark.sql(sql);
        /**
         * repartition可以提高stage的并行度
         */
        return monitorDF.javaRDD();
    }

    /**
     * 获取指定日期内出现指定车辆的卡扣信息
     */
    public static JavaRDD<Row> getCameraRDDByDateRangeAndCars(SparkSession spark, JSONObject taskParamsJsonObject) {
        String startDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_END_DATE);
        String cars = ParamUtils.getParam(taskParamsJsonObject, Constants.FIELD_CARS);
        String[] carArr = cars.split(",");
        String sql =
                "SELECT * "
                        + "FROM monitor_flow_action "
                        + "WHERE date>='" + startDate + "' "
                        + "AND date<='" + endDate + "' "
                        + "AND car IN (";

        for (int i = 0; i < carArr.length; i++) {
            sql += "'" + carArr[i] + "'";
            if (i < carArr.length - 1) {
                sql += ",";
            }
        }
        sql += ")";

        System.out.println("sql:" + sql);
        Dataset<Row> monitorDF = spark.sql(sql);

        /**
         * repartition可以提高stage的并行度
         */
//		return actionDF.javaRDD().repartition(1000);

        return monitorDF.javaRDD();
    }

    public  static JavaRDD<Row> getCameraRDDByDateRangeAndArea(SparkSession spark, JSONObject taskParamsJsonObject,
                                                               String a) {
        String startDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParamsJsonObject, Constants.PARAM_END_DATE);

        String sql =
                "SELECT * "
                        + "FROM monitor_flow_action "
                        + "WHERE date>='" + startDate + "' "
                        + "AND date<='" + endDate + "'"
                        + "AND area_id in ('" + a + "')";
        Dataset<Row> monitorDF = spark.sql(sql);
        monitorDF.show();
        /**
         * repartition可以提高stage的并行度
         */
//		return actionDF.javaRDD().repartition(1000);
        return monitorDF.javaRDD();
    }

}
